{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Pipelines - Pushing Data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can also create pipelines where we **push** data through multiple stages of this pipeline, using `send`, so, essentially, using coroutines."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First let's create a simple decorator to auto-prime our coroutines:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def coroutine(coro):\n",
    "    def inner(*args, **kwargs):\n",
    "        gen = coro(*args, **kwargs)\n",
    "        next(gen)\n",
    "        return gen\n",
    "    return inner"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's start with a data consumer generator that will simply print what it receives - but it could equally well write data to a file, a database, or other processing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "@coroutine\n",
    "def handle_data():\n",
    "    while True:\n",
    "        received = yield\n",
    "        print(received)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's write a coroutine that will receive some data, transform it, and send it along to the next generator:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import math\n",
    "\n",
    "@coroutine\n",
    "def power_up(n, next_gen):\n",
    "    while True:\n",
    "        received = yield\n",
    "        output = math.pow(received, n)\n",
    "        next_gen.send(output)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We are going to generate some data, send it to `power_up`, and specify the next stage as being `handle_data`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1.0\n",
      "4.0\n",
      "9.0\n",
      "16.0\n",
      "25.0\n"
     ]
    }
   ],
   "source": [
    "print_data = handle_data()\n",
    "gen = power_up(2, print_data)\n",
    "# pipeline: gen --> print_data\n",
    "for i in range(1, 6):\n",
    "    gen.send(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Ok, as you can see we are now **pushing** data through this pipeline.\n",
    "\n",
    "But why stop there? Let's add another `power_up` in the pipeline:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1.0\n",
      "64.0\n",
      "729.0\n",
      "4096.0\n",
      "15625.0\n"
     ]
    }
   ],
   "source": [
    "print_data = handle_data()\n",
    "gen2 = power_up(3, print_data)\n",
    "gen1 = power_up(2, gen2)\n",
    "# pipeline: gen1 --> gen2 --> print_data\n",
    "for i in range(1, 6):\n",
    "    gen1.send(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's add a filter to our pipeline that will only retain even values:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "@coroutine\n",
    "def filter_even(next_gen):\n",
    "    while True:\n",
    "        received = yield\n",
    "        if received %2 == 0:\n",
    "            next_gen.send(received)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And let's insert it as the final stage of our pipeline:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "64.0\n",
      "4096.0\n"
     ]
    }
   ],
   "source": [
    "print_data = handle_data()\n",
    "filtered = filter_even(print_data)\n",
    "gen2 = power_up(3, filtered)\n",
    "gen1 = power_up(2, gen2)\n",
    "\n",
    "# pipeline: gen1 --> gen2 --> filtered --> print_data\n",
    "\n",
    "for i in range(1, 6):\n",
    "    gen1.send(i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So as you can see we can easily push data through our pipeline as well."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
